/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.kafka;



import com.google.auto.service.AutoService;

import com.google.auto.value.AutoValue;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.*;

import com.bff.gaia.unified.sdk.expansion.ExternalTransformRegistrar;

import com.bff.gaia.unified.sdk.io.UnboundedSource;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.transforms.*;

import com.bff.gaia.unified.sdk.transforms.display.DisplayData;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PBegin;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PDone;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Charsets;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Joiner;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;

import com.bff.gaia.unified.sdk.coders.AtomicCoder;

import com.bff.gaia.unified.sdk.coders.ByteArrayCoder;

import com.bff.gaia.unified.sdk.coders.CannotProvideCoderException;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.coders.CoderRegistry;

import com.bff.gaia.unified.sdk.coders.KvCoder;

import com.bff.gaia.unified.sdk.coders.NullableCoder;

import com.bff.gaia.unified.sdk.coders.VarIntCoder;

import com.bff.gaia.unified.sdk.coders.VarLongCoder;

import com.bff.gaia.unified.sdk.transforms.DoFn;

import com.bff.gaia.unified.sdk.transforms.ExternalTransformBuilder;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.SimpleFunction;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import org.apache.kafka.common.serialization.Deserializer;

import org.apache.kafka.common.serialization.Serializer;

import org.apache.kafka.common.serialization.StringSerializer;

import org.apache.kafka.common.utils.AppInfoParser;

import org.joda.time.Duration;

import org.joda.time.Instant;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import javax.annotation.Nullable;

import java.io.InputStream;

import java.io.OutputStream;

import java.lang.reflect.Method;

import java.lang.reflect.ParameterizedType;

import java.lang.reflect.Type;

import java.util.*;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/**

 * An unbounded source and a sink for <a href="http://kafka.apache.org/">Kafka</a> topics.

 *

 * <h3>Reading from Kafka topics</h3>

 *

 * <p>KafkaIO source returns unbounded collection of Kafka records as {@code

 * PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic metadata like

 * topic-partition and offset, along with key and value associated with a Kafka record.

 *

 * <p>Although most applications consume a single topic, the source can be configured to consume

 * multiple topics or even a specific set of {@link TopicPartition}s.

 *

 * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,

 * one or more topics to consume, and key and value deserializers. For example:

 *

 * <pre>{@code

 * pipeline

 *   .apply(KafkaIO.<Long, String>read()

 *      .withBootstrapServers("broker_1:9092,broker_2:9092")

 *      .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.

 *      .withKeyDeserializer(LongDeserializer.class)

 *      .withValueDeserializer(StringDeserializer.class)

 *

 *      // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>>

 *

 *      // Rest of the settings are optional :

 *

 *      // you can further customize KafkaConsumer used to read the records by adding more

 *      // settings for ConsumerConfig. e.g :

 *      .updateConsumerProperties(ImmutableMap.of("group.id", "my_unified_app_1"))

 *

 *      // set event times and watermark based on 'LogAppendTime'. To provide a custom

 *      // policy see withTimestampPolicyFactory(). withProcessingTime() is the default.

 *      // Use withCreateTime() with topics that have 'CreateTime' timestamps.

 *      .withLogAppendTime()

 *

 *      // restrict reader to committed messages on Kafka (see method documentation).

 *      .withReadCommitted()

 *

 *      // offset consumed by the pipeline can be committed back.

 *      .commitOffsetsInFinalize()

 *

 *      // finally, if you don't need Kafka metadata, you can drop it.g

 *      .withoutMetadata() // PCollection<KV<Long, String>>

 *   )

 *   .apply(Values.<String>create()) // PCollection<String>

 *    ...

 * }</pre>

 *

 * <p>Kafka provides deserializers for common types in {@link

 * org.apache.kafka.common.serialization}. In addition to deserializers, Unified runners need {@link

 * Coder} to materialize key and value objects if necessary. In most cases, you don't need to

 * specify {@link Coder} for key and value in the resulting collection because the coders are

 * inferred from deserializer types. However, in cases when coder inference fails, they can be

 * specified explicitly along with deserializers using {@link

 * Read#withKeyDeserializerAndCoder(Class, Coder)} and {@link

 * Read#withValueDeserializerAndCoder(Class, Coder)}. Note that Kafka messages are interpreted using

 * key and value <i>deserializers</i>.

 *

 * <h3>Partition Assignment and Checkpointing</h3>

 *

 * The Kafka partitions are evenly distributed among splits (workers).

 *

 * <p>Checkpointing is fully supported and each split can resume from previous checkpoint (to the

 * extent supported by runner). See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for

 * more details on splits and checkpoint support.

 *

 * <p>When the pipeline starts for the first time, or without any checkpoint, the source starts

 * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the

 * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through {@link

 * Read#updateConsumerProperties(Map)}. You can also enable offset auto_commit in Kafka to resume

 * from last committed.

 *

 * <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br>

 * 1. {@link KafkaCheckpointMark} provided by runner;<br>

 * 2. Consumer offset stored in Kafka when {@code ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG = true};

 * <br>

 * 3. Start from <em>latest</em> offset by default;

 *

 * <h3>Writing to Kafka</h3>

 *

 * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write just the

 * values or native Kafka producer records using {@link

 * org.apache.kafka.clients.producer.ProducerRecord}. To configure a Kafka sink, you must specify at

 * the minimum Kafka <tt>bootstrapServers</tt>, the topic to write to, and key and value

 * serializers. For example:

 *

 * <pre>{@code

 * PCollection<KV<Long, String>> kvColl = ...;

 * kvColl.apply(KafkaIO.<Long, String>write()

 *      .withBootstrapServers("broker_1:9092,broker_2:9092")

 *      .withTopic("results")

 *

 *      .withKeySerializer(LongSerializer.class)

 *      .withValueSerializer(StringSerializer.class)

 *

 *      // You can further customize KafkaProducer used to write the records by adding more

 *      // settings for ProducerConfig. e.g, to enable compression :

 *      .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))

 *

 *      // You set publish timestamp for the Kafka records.

 *      .withInputTimestamp() // element timestamp is used while publishing to Kafka

 *      // or you can also set a custom timestamp with a function.

 *      .withPublishTimestampFunction((elem, elemTs) -> ...)

 *

 *      // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS().

 *      .withEOS(20, "eos-sink-group-id");

 *   );

 * }</pre>

 *

 * <p>Often you might want to write just values without any keys to Kafka. Use {@code values()} to

 * write records with default empty(null) key:

 *

 * <pre>{@code

 * PCollection<String> strings = ...;

 * strings.apply(KafkaIO.<Void, String>write()

 *     .withBootstrapServers("broker_1:9092,broker_2:9092")

 *     .withTopic("results")

 *     .withValueSerializer(StringSerializer.class) // just need serializer for value

 *     .values()

 *   );

 * }</pre>

 *

 * <p>Also, if you want to write Kafka {@link ProducerRecord} then you should use {@link

 * KafkaIO#writeRecords()}:

 *

 * <pre>{@code

 * PCollection<ProducerRecord<Long, String>> records = ...;

 * records.apply(KafkaIO.<Long, String>writeRecords()

 *     .withBootstrapServers("broker_1:9092,broker_2:9092")

 *     .withTopic("results")

 *     .withKeySerializer(LongSerializer.class)

 *     .withValueSerializer(StringSerializer.class)

 *   );

 * }</pre>

 *

 * <h3>Advanced Kafka Configuration</h3>

 *

 * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in {@link

 * ProducerConfig} for sink. E.g. if you would like to enable offset <em>auto commit</em> (for

 * external monitoring or other purposes), you can set <tt>"group.id"</tt>,

 * <tt>"enable.auto.commit"</tt>, etc.

 *

 * <h3>Event Timestamps and Watermark</h3>

 *

 * By default, record timestamp (event time) is set to processing time in KafkaIO reader and source

 * watermark is current wall time. If a topic has Kafka server-side ingestion timestamp enabled

 * ('LogAppendTime'), it can enabled with {@link Read#withLogAppendTime()}. A custom timestamp

 * policy can be provided by implementing {@link TimestampPolicyFactory}. See {@link

 * Read#withTimestampPolicyFactory(TimestampPolicyFactory)} for more information.

 *

 * <h3>Supported Kafka Client Versions</h3>

 *

 * KafkaIO relies on <i>kafka-clients</i> for all its interactions with the Kafka cluster.

 * <i>kafka-clients</i> versions 0.10.1 and newer are supported at runtime. The older versions 0.9.x

 * - 0.10.0.0 are also supported, but are deprecated and likely be removed in near future. Please

 * ensure that the version included with the application is compatible with the version of your

 * Kafka cluster. Kafka client usually fails to initialize with a clear error message in case of

 * incompatibility.

 */

@Experimental(Experimental.Kind.SOURCE_SINK)

public class KafkaIO {



  /**

   * A specific instance of uninitialized {@link #read()} where key and values are bytes. See

   * #read().

   */

  public static Read<byte[], byte[]> readBytes() {

    return KafkaIO.<byte[], byte[]>read()

        .withKeyDeserializer(ByteArrayDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class);

  }



  /**

   * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration

   * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}.

   * Other optional settings include key and value {@link Deserializer}s, custom timestamp and

   * watermark functions.

   */

  public static <K, V> Read<K, V> read() {

    return new AutoValue_KafkaIO_Read.Builder<K, V>()

        .setTopics(new ArrayList<>())

        .setTopicPartitions(new ArrayList<>())

        .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)

        .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)

        .setMaxNumRecords(Long.MAX_VALUE)

        .setCommitOffsetsInFinalizeEnabled(false)

        .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())

        .build();

  }



  /**

   * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration

   * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} along

   * with {@link Deserializer}s for (optional) key and values.

   */

  public static <K, V> Write<K, V> write() {

    return new AutoValue_KafkaIO_Write.Builder<K, V>()

        .setWriteRecordsTransform(

            new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()

                .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)

                .setEOS(false)

                .setNumShards(0)

                .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)

                .build())

        .build();

  }



  /**

   * Creates an uninitialized {@link WriteRecords} {@link PTransform}. Before use, Kafka

   * configuration should be set with {@link WriteRecords#withBootstrapServers(String)} and {@link

   * WriteRecords#withTopic} along with {@link Deserializer}s for (optional) key and values.

   */

  public static <K, V> WriteRecords<K, V> writeRecords() {

    return new AutoValue_KafkaIO_WriteRecords.Builder<K, V>()

        .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES)

        .setEOS(false)

        .setNumShards(0)

        .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)

        .build();

  }



  ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\



  /**

   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more information on

   * usage and configuration.

   */

  @AutoValue

  public abstract static class Read<K, V>

      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {

    abstract Map<String, Object> getConsumerConfig();



    abstract List<String> getTopics();



    abstract List<TopicPartition> getTopicPartitions();



    @Nullable

    abstract Coder<K> getKeyCoder();



    @Nullable

    abstract Coder<V> getValueCoder();



    @Nullable

    abstract Class<? extends Deserializer<K>> getKeyDeserializer();



    @Nullable

    abstract Class<? extends Deserializer<V>> getValueDeserializer();



    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>

        getConsumerFactoryFn();



    @Nullable

    abstract SerializableFunction<KafkaRecord<K, V>, Instant> getWatermarkFn();



    abstract long getMaxNumRecords();



    @Nullable

    abstract Duration getMaxReadTime();



    @Nullable

    abstract Instant getStartReadTime();



    abstract boolean isCommitOffsetsInFinalizeEnabled();



    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();



    @Nullable

    abstract Map<String, Object> getOffsetConsumerConfig();



    abstract Builder<K, V> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<K, V>

        implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {

      abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);



      abstract Builder<K, V> setTopics(List<String> topics);



      abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);



      abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);



      abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);



      abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer);



      abstract Builder<K, V> setValueDeserializer(

          Class<? extends Deserializer<V>> valueDeserializer);



      abstract Builder<K, V> setConsumerFactoryFn(

          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);



      abstract Builder<K, V> setWatermarkFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);



      abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);



      abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);



      abstract Builder<K, V> setStartReadTime(Instant startReadTime);



      abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);



      abstract Builder<K, V> setTimestampPolicyFactory(

          TimestampPolicyFactory<K, V> timestampPolicyFactory);



      abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);



      abstract Read<K, V> build();



      @Override

      public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(

          External.Configuration config) {

        ImmutableList.Builder<String> listBuilder = ImmutableList.builder();

        for (byte[] topic : config.topics) {

          listBuilder.add(utf8String(topic));

        }

        setTopics(listBuilder.build());



        String keyDeserializerClassName = utf8String(config.keyDeserializer);

        Class keyDeserializer = resolveClass(keyDeserializerClassName);

        setKeyDeserializer(keyDeserializer);

        setKeyCoder(resolveCoder(keyDeserializer));



        String valueDeserializerClassName = utf8String(config.valueDeserializer);

        Class valueDeserializer = resolveClass(valueDeserializerClassName);

        setValueDeserializer(valueDeserializer);

        setValueCoder(resolveCoder(valueDeserializer));



        Map<String, Object> consumerConfig = new HashMap<>();

        for (KV<byte[], byte[]> kv : config.consumerConfig) {

          String key = utf8String(kv.getKey());

          String value = utf8String(kv.getValue());

          consumerConfig.put(key, value);

        }

        // Key and Value Deserializers always have to be in the config.

        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());

        consumerConfig.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());

        setConsumerConfig(consumerConfig);



        // Set required defaults

        setTopicPartitions(Collections.emptyList());

        setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN);

        setMaxNumRecords(Long.MAX_VALUE);

        setCommitOffsetsInFinalizeEnabled(false);

        setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());

        // We do not include Metadata until we can encode KafkaRecords cross-language

        return build().withoutMetadata();

      }



      private static Coder resolveCoder(Class deserializer) {

        for (Method method : deserializer.getDeclaredMethods()) {

          if (method.getName().equals("deserialize")) {

            Class<?> returnType = method.getReturnType();

            if (returnType.equals(Object.class)) {

              continue;

            }

            if (returnType.equals(byte[].class)) {

              return ByteArrayCoder.of();

            } else if (returnType.equals(Integer.class)) {

              return VarIntCoder.of();

            } else if (returnType.equals(Long.class)) {

              return VarLongCoder.of();

            } else {

              throw new RuntimeException("Couldn't infer Coder from " + deserializer);

            }

          }

        }

        throw new RuntimeException("Couldn't resolve coder for Deserializer: " + deserializer);

      }

    }



    /**

     * Exposes {@link KafkaIO.TypedWithoutMetadata} as an external transform for cross-language

     * usage.

     */

    @AutoService(ExternalTransformRegistrar.class)

    public static class External implements ExternalTransformRegistrar {



      public static final String URN = "unified:external:java:kafka:read:v1";



      @Override

      public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {

        return ImmutableMap.of(URN, AutoValue_KafkaIO_Read.Builder.class);

      }



      /** Parameters class to expose the Read transform to an external SDK. */

      public static class Configuration {



        // All byte arrays are UTF-8 encoded strings

        private Iterable<KV<byte[], byte[]>> consumerConfig;

        private Iterable<byte[]> topics;

        private byte[] keyDeserializer;

        private byte[] valueDeserializer;



        public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) {

          this.consumerConfig = consumerConfig;

        }



        public void setTopics(Iterable<byte[]> topics) {

          this.topics = topics;

        }



        public void setKeyDeserializer(byte[] keyDeserializer) {

          this.keyDeserializer = keyDeserializer;

        }



        public void setValueDeserializer(byte[] valueDeserializer) {

          this.valueDeserializer = valueDeserializer;

        }

      }

    }



    /** Sets the bootstrap servers for the Kafka consumer. */

    public Read<K, V> withBootstrapServers(String bootstrapServers) {

      return updateConsumerProperties(

          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));

    }



    /**

     * Sets the topic to read from.

     *

     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the

     * partitions are distributed among the splits.

     */

    public Read<K, V> withTopic(String topic) {

      return withTopics(ImmutableList.of(topic));

    }



    /**

     * Sets a list of topics to read from. All the partitions from each of the topics are read.

     *

     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the

     * partitions are distributed among the splits.

     */

    public Read<K, V> withTopics(List<String> topics) {

      checkState(

          getTopicPartitions().isEmpty(), "Only topics or topicPartitions can be set, not both");

      return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();

    }



    /**

     * Sets a list of partitions to read from. This allows reading only a subset of partitions for

     * one or more topics when (if ever) needed.

     *

     * <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the

     * partitions are distributed among the splits.

     */

    public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {

      checkState(getTopics().isEmpty(), "Only topics or topicPartitions can be set, not both");

      return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();

    }



    /**

     * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.

     *

     * <p>In addition, Unified also needs a {@link Coder} to serialize and deserialize key objects at

     * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class,

     * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to

     * provide the key coder explicitly.

     */

    public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {

      return toBuilder().setKeyDeserializer(keyDeserializer).build();

    }



    /**

     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a

     * {@link Coder} for helping the Unified runner materialize key objects at runtime if necessary.

     *

     * <p>Use this method only if your pipeline doesn't work with plain {@link

     * #withKeyDeserializer(Class)}.

     */

    public Read<K, V> withKeyDeserializerAndCoder(

        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {

      return toBuilder().setKeyDeserializer(keyDeserializer).setKeyCoder(keyCoder).build();

    }



    /**

     * Sets a Kafka {@link Deserializer} to interpret value bytes read from Kafka.

     *

     * <p>In addition, Unified also needs a {@link Coder} to serialize and deserialize value objects at

     * runtime. KafkaIO tries to infer a coder for the value based on the {@link Deserializer}

     * class, however in case that fails, you can use {@link #withValueDeserializerAndCoder(Class,

     * Coder)} to provide the value coder explicitly.

     */

    public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {

      return toBuilder().setValueDeserializer(valueDeserializer).build();

    }



    /**

     * Sets a Kafka {@link Deserializer} for interpreting value bytes read from Kafka along with a

     * {@link Coder} for helping the Unified runner materialize value objects at runtime if necessary.

     *

     * <p>Use this method only if your pipeline doesn't work with plain {@link

     * #withValueDeserializer(Class)}.

     */

    public Read<K, V> withValueDeserializerAndCoder(

        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) {

      return toBuilder().setValueDeserializer(valueDeserializer).setValueCoder(valueCoder).build();

    }



    /**

     * A factory to create Kafka {@link Consumer} from consumer configuration. This is useful for

     * supporting another version of Kafka consumer. Default is {@link KafkaConsumer}.

     */

    public Read<K, V> withConsumerFactoryFn(

        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {

      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();

    }



    /** Update consumer configuration with new properties. */

    public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {

      Map<String, Object> config =

          updateKafkaProperties(getConsumerConfig(), IGNORED_CONSUMER_PROPERTIES, configUpdates);

      return toBuilder().setConsumerConfig(config).build();

    }



    /**

     * Similar to {@link com.bff.gaia.unified.sdk.io.Read.Unbounded#withMaxNumRecords(long)}. Mainly used

     * for tests and demo applications.

     */

    public Read<K, V> withMaxNumRecords(long maxNumRecords) {

      return toBuilder().setMaxNumRecords(maxNumRecords).build();

    }



    /**

     * Use timestamp to set up start offset. It is only supported by Kafka Client 0.10.1.0 onwards

     * and the message format version after 0.10.0.

     *

     * <p>Note that this take priority over start offset configuration {@code

     * ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} and any auto committed offsets.

     *

     * <p>This results in hard failures in either of the following two cases : 1. If one of more

     * partitions do not contain any messages with timestamp larger than or equal to desired

     * timestamp. 2. If the message format version in a partition is before 0.10.0, i.e. the

     * messages do not have timestamps.

     */

    public Read<K, V> withStartReadTime(Instant startReadTime) {

      return toBuilder().setStartReadTime(startReadTime).build();

    }



    /**

     * Similar to {@link com.bff.gaia.unified.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}. Mainly

     * used for tests and demo applications.

     */

    public Read<K, V> withMaxReadTime(Duration maxReadTime) {

      return toBuilder().setMaxReadTime(maxReadTime).build();

    }



    /**

     * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy}. The

     * policy assigns Kafka's log append time (server side ingestion time) to each record. The

     * watermark for each Kafka partition is the timestamp of the last record read. If a partition

     * is idle, the watermark advances to couple of seconds behind wall time. Every record consumed

     * from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'.

     *

     * <p>In Kafka, log append time needs to be enabled for each topic, and all the subsequent

     * records wil have their timestamp set to log append time. If a record does not have its

     * timestamp type set to 'LOG_APPEND_TIME' for any reason, it's timestamp is set to previous

     * record timestamp or latest watermark, whichever is larger.

     *

     * <p>The watermark for the entire source is the oldest of each partition's watermark. If one of

     * the readers falls behind possibly due to uneven distribution of records among Kafka

     * partitions, it ends up holding the watermark for the entire source.

     */

    public Read<K, V> withLogAppendTime() {

      return withTimestampPolicyFactory(TimestampPolicyFactory.withLogAppendTime());

    }



    /**

     * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.ProcessingTimePolicy}. This is

     * the default timestamp policy. It assigns processing time to each record. Specifically, this

     * is the timestamp when the record becomes 'current' in the reader. The watermark aways

     * advances to current time. If server side time (log append time) is enabled in Kafka, {@link

     * #withLogAppendTime()} is recommended over this.

     */

    public Read<K, V> withProcessingTime() {

      return withTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime());

    }



    /**

     * Sets the timestamps policy based on {@link KafkaTimestampType#CREATE_TIME} timestamp of the

     * records. It is an error if a record's timestamp type is not {@link

     * KafkaTimestampType#CREATE_TIME}. The timestamps within a partition are expected to be roughly

     * monotonically increasing with a cap on out of order delays (e.g. 'max delay' of 1 minute).

     * The watermark at any time is '({@code Min(now(), Max(event timestamp so far)) - max delay})'.

     * However, watermark is never set in future and capped to 'now - max delay'. In addition,

     * watermark advanced to 'now - max delay' when a partition is idle.

     *

     * @param maxDelay For any record in the Kafka partition, the timestamp of any subsequent record

     *     is expected to be after {@code current record timestamp - maxDelay}.

     */

    public Read<K, V> withCreateTime(Duration maxDelay) {

      return withTimestampPolicyFactory(TimestampPolicyFactory.withCreateTime(maxDelay));

    }



    /**

     * Provide custom {@link TimestampPolicyFactory} to set event times and watermark for each

     * partition. {@link TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is

     * invoked for each partition when the reader starts.

     *

     * @see #withLogAppendTime()

     * @see #withCreateTime(Duration)

     * @see #withProcessingTime()

     */

    public Read<K, V> withTimestampPolicyFactory(

        TimestampPolicyFactory<K, V> timestampPolicyFactory) {

      return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build();

    }



    /**

     * A function to assign a timestamp to a record. Default is processing timestamp.

     *

     * @deprecated as of version 2.4. Use {@link

     *     #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.

     */

    @Deprecated

    public Read<K, V> withTimestampFn2(

        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {

      checkArgument(timestampFn != null, "timestampFn can not be null");

      return toBuilder()

          .setTimestampPolicyFactory(TimestampPolicyFactory.withTimestampFn(timestampFn))

          .build();

    }



    /**

     * A function to calculate watermark after a record. Default is last record timestamp.

     *

     * @see #withTimestampFn(SerializableFunction)

     * @deprecated as of version 2.4. Use {@link

     *     #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.

     */

    @Deprecated

    public Read<K, V> withWatermarkFn2(

        SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {

      checkArgument(watermarkFn != null, "watermarkFn can not be null");

      return toBuilder().setWatermarkFn(watermarkFn).build();

    }



    /**

     * A function to assign a timestamp to a record. Default is processing timestamp.

     *

     * @deprecated as of version 2.4. Use {@link

     *     #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.

     */

    @Deprecated

    public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {

      checkArgument(timestampFn != null, "timestampFn can not be null");

      return withTimestampFn2(unwrapKafkaAndThen(timestampFn));

    }



    /**

     * A function to calculate watermark after a record. Default is last record timestamp.

     *

     * @see #withTimestampFn(SerializableFunction)

     * @deprecated as of version 2.4. Use {@link

     *     #withTimestampPolicyFactory(TimestampPolicyFactory)} instead.

     */

    @Deprecated

    public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {

      checkArgument(watermarkFn != null, "watermarkFn can not be null");

      return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));

    }



    /**

     * Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is ensures

     * that the consumer does not read uncommitted messages. Kafka version 0.11 introduced

     * transactional writes. Applications requiring end-to-end exactly-once semantics should only

     * read committed messages. See JavaDoc for {@link KafkaConsumer} for more description.

     */

    public Read<K, V> withReadCommitted() {

      return updateConsumerProperties(ImmutableMap.of("isolation.level", "read_committed"));

    }



    /**

     * Finalized offsets are committed to Kafka. See {@link UnboundedSource.CheckpointMark#finalizeCheckpoint()}. It

     * helps with minimizing gaps or duplicate processing of records while restarting a pipeline

     * from scratch. But it does not provide hard processing guarantees. There could be a short

     * delay to commit after {@link UnboundedSource.CheckpointMark#finalizeCheckpoint()} is invoked, as reader might

     * be blocked on reading from Kafka. Note that it is independent of 'AUTO_COMMIT' Kafka consumer

     * configuration. Usually either this or AUTO_COMMIT in Kafka consumer is enabled, but not both.

     */

    public Read<K, V> commitOffsetsInFinalize() {

      return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();

    }



    /**

     * Set additional configuration for the backend offset consumer. It may be required for a

     * secured Kafka cluster, especially when you see similar WARN log message 'exception while

     * fetching latest offset for partition {}. will be retried'.

     *

     * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>

     * 1. the main consumer, which reads data from kafka;<br>

     * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest

     * offset;<br>

     *

     * <p>By default, offset consumer inherits the configuration from main consumer, with an

     * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka

     * which requires more configurations.

     */

    public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {

      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();

    }



    /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */

    public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {

      return new TypedWithoutMetadata<>(this);

    }



    @Override

    public PCollection<KafkaRecord<K, V>> expand(PBegin input) {

      checkArgument(

          getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,

          "withBootstrapServers() is required");

      checkArgument(

          getTopics().size() > 0 || getTopicPartitions().size() > 0,

          "Either withTopic(), withTopics() or withTopicPartitions() is required");

      checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is required");

      checkArgument(getValueDeserializer() != null, "withValueDeserializer() is required");

      ConsumerSpEL consumerSpEL = new ConsumerSpEL();



      if (!consumerSpEL.hasOffsetsForTimes()) {

        LOG.warn(

            "Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and "

                + "may not be supported in next release of Apache Unified. "

                + "Please upgrade your Kafka client version.",

            AppInfoParser.getVersion());

      }

      if (getStartReadTime() != null) {

        checkArgument(

            consumerSpEL.hasOffsetsForTimes(),

            "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "

                + "current version of Kafka Client is "

                + AppInfoParser.getVersion()

                + ". If you are building with maven, set \"kafka.clients.version\" "

                + "maven property to 0.10.1.0 or newer.");

      }

      if (isCommitOffsetsInFinalizeEnabled()) {

        checkArgument(

            getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,

            "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config "

                + "is not set. Offset management requires group.id.");

        if (Boolean.TRUE.equals(

            getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {

          LOG.warn(

              "'{}' in consumer config is enabled even though commitOffsetsInFinalize() "

                  + "is set. You need only one of them.",

              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);

        }

      }



      // Infer key/value coders if not specified explicitly

      CoderRegistry registry = input.getPipeline().getCoderRegistry();



      Coder<K> keyCoder =

          getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer());

      checkArgument(

          keyCoder != null,

          "Key coder could not be inferred from key deserializer. Please provide"

              + "key coder explicitly using withKeyDeserializerAndCoder()");



      Coder<V> valueCoder =

          getValueCoder() != null ? getValueCoder() : inferCoder(registry, getValueDeserializer());

      checkArgument(

          valueCoder != null,

          "Value coder could not be inferred from value deserializer. Please provide"

              + "value coder explicitly using withValueDeserializerAndCoder()");



      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.

      com.bff.gaia.unified.sdk.io.Read.Unbounded<KafkaRecord<K, V>> unbounded =

          com.bff.gaia.unified.sdk.io.Read.from(

              toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());



      PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;



      if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {

        transform =

            unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());

      }



      return input.getPipeline().apply(transform);

    }



    /**

     * Creates an {@link UnboundedSource UnboundedSource&lt;KafkaRecord&lt;K, V&gt;, ?&gt;} with the

     * configuration in {@link Read}. Primary use case is unit tests, should not be used in an

     * application.

     */

    @VisibleForTesting

    UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {

      return new KafkaUnboundedSource<>(this, -1);

    }



    // utility method to convert KafkaRecord<K, V> to user KV<K, V> before applying user functions

    private static <KeyT, ValueT, OutT>

        SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(

            final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {

      return record -> fn.apply(record.getKV());

    }

    ///////////////////////////////////////////////////////////////////////////////////////



    /** A set of properties that are not required or don't make sense for our consumer. */

    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES =

        ImmutableMap.of(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"

            // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :

            //     lets allow these, applications can have better resume point for restarts.

            );



    // set config defaults

    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =

        ImmutableMap.of(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            ByteArrayDeserializer.class.getName(),

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            ByteArrayDeserializer.class.getName(),



            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.

            // with default value of of 32K, It takes multiple seconds between successful polls.

            // All the consumer work is done inside poll(), with smaller send buffer size, it

            // takes many polls before a 1MB chunk from the server is fully read. In my testing

            // about half of the time select() inside kafka consumer waited for 20-30ms, though

            // the server had lots of data in tcp send buffers on its side. Compared to default,

            // this setting increased throughput by many fold (3-4x).

            ConsumerConfig.RECEIVE_BUFFER_CONFIG,

            512 * 1024,



            // default to latest offset when we are not resuming.

            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,

            "latest",

            // disable auto commit of offsets. we don't require group_id. could be enabled by user.

            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,

            false);



    // default Kafka 0.9 Consumer supplier.

    private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>

        KAFKA_CONSUMER_FACTORY_FN = KafkaConsumer::new;



    @SuppressWarnings("unchecked")

    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      List<String> topics = getTopics();

      List<TopicPartition> topicPartitions = getTopicPartitions();

      if (topics.size() > 0) {

        builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));

      } else if (topicPartitions.size() > 0) {

        builder.add(

            DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))

                .withLabel("Topic Partition/s"));

      }

      Set<String> ignoredConsumerPropertiesKeys = IGNORED_CONSUMER_PROPERTIES.keySet();

      for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) {

        String key = conf.getKey();

        if (!ignoredConsumerPropertiesKeys.contains(key)) {

          Object value =

              DisplayData.inferType(conf.getValue()) != null

                  ? conf.getValue()

                  : String.valueOf(conf.getValue());

          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));

        }

      }

    }

  }



  /**

   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but removes

   * Kafka metatdata and returns a {@link PCollection} of {@link KV}. See {@link KafkaIO} for more

   * information on usage and configuration of reader.

   */

  public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {

    private final Read<K, V> read;



    TypedWithoutMetadata(Read<K, V> read) {

      super("KafkaIO.Read");

      this.read = read;

    }



    @Override

    public PCollection<KV<K, V>> expand(PBegin begin) {

      return begin

          .apply(read)

          .apply(

              "Remove Kafka Metadata",

              ParDo.of(

                  new DoFn<KafkaRecord<K, V>, KV<K, V>>() {

                    @ProcessElement

                    public void processElement(ProcessContext ctx) {

                      ctx.output(ctx.element().getKV());

                    }

                  }));

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      read.populateDisplayData(builder);

    }

  }



  ////////////////////////////////////////////////////////////////////////////////////////////////



  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);



  /**

   * Returns a new config map which is merge of current config and updates. Verifies the updates do

   * not includes ignored properties.

   */

  private static Map<String, Object> updateKafkaProperties(

      Map<String, Object> currentConfig,

      Map<String, String> ignoredProperties,

      Map<String, Object> updates) {



    for (String key : updates.keySet()) {

      checkArgument(

          !ignoredProperties.containsKey(key),

          "No need to configure '%s'. %s",

          key,

          ignoredProperties.get(key));

    }



    Map<String, Object> config = new HashMap<>(currentConfig);

    config.putAll(updates);



    return config;

  }



  /** Static class, prevent instantiation. */

  private KafkaIO() {}



  //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\



  /**

   * A {@link PTransform} to write to a Kafka topic with ProducerRecord's. See {@link KafkaIO} for

   * more information on usage and configuration.

   */

  @AutoValue

  public abstract static class WriteRecords<K, V>

      extends PTransform<PCollection<ProducerRecord<K, V>>, PDone> {

    // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be

    // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,

    // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and

    // {@link WriteRecords}. See example at {@link PubsubIO.Write}.



    @Nullable

    abstract String getTopic();



    abstract Map<String, Object> getProducerConfig();



    @Nullable

    abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();



    @Nullable

    abstract Class<? extends Serializer<K>> getKeySerializer();



    @Nullable

    abstract Class<? extends Serializer<V>> getValueSerializer();



    @Nullable

    abstract KafkaPublishTimestampFunction<ProducerRecord<K, V>> getPublishTimestampFunction();



    // Configuration for EOS sink

    abstract boolean isEOS();



    @Nullable

    abstract String getSinkGroupId();



    abstract int getNumShards();



    @Nullable

    abstract SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>>

        getConsumerFactoryFn();



    abstract Builder<K, V> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<K, V> {

      abstract Builder<K, V> setTopic(String topic);



      abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);



      abstract Builder<K, V> setProducerFactoryFn(

          SerializableFunction<Map<String, Object>, Producer<K, V>> fn);



      abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);



      abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);



      abstract Builder<K, V> setPublishTimestampFunction(

          KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction);



      abstract Builder<K, V> setEOS(boolean eosEnabled);



      abstract Builder<K, V> setSinkGroupId(String sinkGroupId);



      abstract Builder<K, V> setNumShards(int numShards);



      abstract Builder<K, V> setConsumerFactoryFn(

          SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);



      abstract WriteRecords<K, V> build();

    }



    /**

     * Returns a new {@link Write} transform with Kafka producer pointing to {@code

     * bootstrapServers}.

     */

    public WriteRecords<K, V> withBootstrapServers(String bootstrapServers) {

      return updateProducerProperties(

          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));

    }



    /**

     * Sets the default Kafka topic to write to. Use {@code ProducerRecords} to set topic name per

     * published record.

     */

    public WriteRecords<K, V> withTopic(String topic) {

      return toBuilder().setTopic(topic).build();

    }



    /**

     * Sets a {@link Serializer} for serializing key (if any) to bytes.

     *

     * <p>A key is optional while writing to Kafka. Note when a key is set, its hash is used to

     * determine partition in Kafka (see {@link ProducerRecord} for more details).

     */

    public WriteRecords<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {

      return toBuilder().setKeySerializer(keySerializer).build();

    }



    /** Sets a {@link Serializer} for serializing value to bytes. */

    public WriteRecords<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {

      return toBuilder().setValueSerializer(valueSerializer).build();

    }



    /**

     * Adds the given producer properties, overriding old values of properties with the same key.

     */

    public WriteRecords<K, V> updateProducerProperties(Map<String, Object> configUpdates) {

      Map<String, Object> config =

          updateKafkaProperties(getProducerConfig(), IGNORED_PRODUCER_PROPERTIES, configUpdates);

      return toBuilder().setProducerConfig(config).build();

    }



    /**

     * Sets a custom function to create Kafka producer. Primarily used for tests. Default is {@link

     * KafkaProducer}

     */

    public WriteRecords<K, V> withProducerFactoryFn(

        SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {

      return toBuilder().setProducerFactoryFn(producerFactoryFn).build();

    }



    /**

     * The timestamp for each record being published is set to timestamp of the element in the

     * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, ts) -> ts)}. <br>

     * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is

     * processing messages from the past, they might be deleted immediately by Kafka after being

     * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.

     */

    public WriteRecords<K, V> withInputTimestamp() {

      return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());

    }



    /**

     * A function to provide timestamp for records being published. <br>

     * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline is

     * processing messages from the past, they might be deleted immediately by Kafka after being

     * published if the timestamps are older than Kafka cluster's {@code log.retention.hours}.

     *

     * @deprecated use {@code ProducerRecords} to set publish timestamp.

     */

    @Deprecated

    public WriteRecords<K, V> withPublishTimestampFunction(

        KafkaPublishTimestampFunction<ProducerRecord<K, V>> timestampFunction) {

      return toBuilder().setPublishTimestampFunction(timestampFunction).build();

    }



    /**

     * Provides exactly-once semantics while writing to Kafka, which enables applications with

     * end-to-end exactly-once guarantees on top of exactly-once semantics <i>within</i> Unified

     * pipelines. It ensures that records written to sink are committed on Kafka exactly once, even

     * in the case of retries during pipeline execution even when some processing is retried.

     * Retries typically occur when workers restart (as in failure recovery), or when the work is

     * redistributed (as in an autoscaling event).

     *

     * <p>Unified runners typically provide exactly-once semantics for results of a pipeline, but not

     * for side effects from user code in transform. If a transform such as Kafka sink writes to an

     * external system, those writes might occur more than once. When EOS is enabled here, the sink

     * transform ties checkpointing semantics in compatible Unified runners and transactions in Kafka

     * (version 0.11+) to ensure a record is written only once. As the implementation relies on

     * runners checkpoint semantics, not all the runners are compatible. The sink throws an

     * exception during initialization if the runner is not whitelisted. Gaia runner is one of the

     * runners whose checkpoint semantics are not compatible with current implementation (hope to

     * provide a solution in near future). Dataflow runner and Spark runners are whitelisted as

     * compatible.

     *

     * <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition

     * to cost of shuffling the records among workers, the records go through 2

     * serialization-deserialization cycles. Depending on volume and cost of serialization, the CPU

     * cost might be noticeable. The CPU cost can be reduced by writing byte arrays (i.e.

     * serializing them to byte before writing to Kafka sink).

     *

     * @param numShards Sets sink parallelism. The state metadata stored on Kafka is stored across

     *     this many virtual partitions using {@code sinkGroupId}. A good rule of thumb is to set

     *     this to be around number of partitions in Kafka topic.

     * @param sinkGroupId The <i>group id</i> used to store small amount of state as metadata on

     *     Kafka. It is similar to <i>consumer group id</i> used with a {@link KafkaConsumer}. Each

     *     job should use a unique group id so that restarts/updates of job preserve the state to

     *     ensure exactly-once semantics. The state is committed atomically with sink transactions

     *     on Kafka. See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)} for more

     *     information. The sink performs multiple sanity checks during initialization to catch

     *     common mistakes so that it does not end up using state that does not <i>seem</i> to be

     *     written by the same job.

     */

    public WriteRecords<K, V> withEOS(int numShards, String sinkGroupId) {

      KafkaExactlyOnceSink.ensureEOSSupport();

      checkArgument(numShards >= 1, "numShards should be >= 1");

      checkArgument(sinkGroupId != null, "sinkGroupId is required for exactly-once sink");

      return toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build();

    }



    /**

     * When exactly-once semantics are enabled (see {@link #withEOS(int, String)}), the sink needs

     * to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer.

     * Similar to {@link Read#withConsumerFactoryFn(SerializableFunction)}, a factory function can

     * be supplied if required in a specific case. The default is {@link KafkaConsumer}.

     */

    public WriteRecords<K, V> withConsumerFactoryFn(

        SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {

      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();

    }



    @Override

    public PDone expand(PCollection<ProducerRecord<K, V>> input) {

      checkArgument(

          getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,

          "withBootstrapServers() is required");



      checkArgument(getKeySerializer() != null, "withKeySerializer() is required");

      checkArgument(getValueSerializer() != null, "withValueSerializer() is required");



      if (isEOS()) {

        checkArgument(getTopic() != null, "withTopic() is required when isEOS() is true");

        KafkaExactlyOnceSink.ensureEOSSupport();



        // TODO: Verify that the group_id does not have existing state stored on Kafka unless

        //       this is an upgrade. This avoids issues with simple mistake of reusing group_id

        //       across multiple runs or across multiple jobs. This is checked when the sink

        //       transform initializes while processing the output. It might be better to

        //       check here to catch common mistake.



        input.apply(new KafkaExactlyOnceSink<>(this));

      } else {

        input.apply(ParDo.of(new KafkaWriter<>(this)));

      }

      return PDone.in(input.getPipeline());

    }



    @Override

    public void validate(PipelineOptions options) {

      if (isEOS()) {

        String runner = options.getRunner().getName();

        if ("com.bff.gaia.unified.runners.direct.DirectRunner".equals(runner)

            || runner.startsWith("com.bff.gaia.unified.runners.dataflow.")

            || runner.startsWith("com.bff.gaia.unified.runners.spark.")

            || runner.startsWith("com.bff.gaia.unified.runners.gaia.")) {

          return;

        }

        throw new UnsupportedOperationException(

            runner

                + " is not whitelisted among runners compatible with Kafka exactly-once sink. "

                + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "

                + "Only the runners with known to have compatible checkpoint semantics are whitelisted.");

      }

    }



    // set config defaults

    private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =

        ImmutableMap.of(ProducerConfig.RETRIES_CONFIG, 3);



    /** A set of properties that are not required or don't make sense for our producer. */

    private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES =

        ImmutableMap.of(

            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",

            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead");



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      builder.addIfNotNull(DisplayData.item("topic", getTopic()).withLabel("Topic"));

      Set<String> ignoredProducerPropertiesKeys = IGNORED_PRODUCER_PROPERTIES.keySet();

      for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) {

        String key = conf.getKey();

        if (!ignoredProducerPropertiesKeys.contains(key)) {

          Object value =

              DisplayData.inferType(conf.getValue()) != null

                  ? conf.getValue()

                  : String.valueOf(conf.getValue());

          builder.add(DisplayData.item(key, ValueProvider.StaticValueProvider.of(value)));

        }

      }

    }

  }



  /**

   * A {@link PTransform} to write to a Kafka topic with KVs . See {@link KafkaIO} for more

   * information on usage and configuration.

   */

  @AutoValue

  public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {

    // TODO (Version 3.0): Create the only one generic {@code Write<T>} transform which will be

    // parameterized depending on type of input collection (KV, ProducerRecords, etc). In such case,

    // we shouldn't have to duplicate the same API for similar transforms like {@link Write} and

    // {@link WriteRecords}. See example at {@link PubsubIO.Write}.



    @Nullable

    abstract String getTopic();



    abstract WriteRecords<K, V> getWriteRecordsTransform();



    abstract Builder<K, V> toBuilder();



    @AutoValue.Builder

    abstract static class Builder<K, V>

        implements ExternalTransformBuilder<External.Configuration, PCollection<KV<K, V>>, PDone> {

      abstract Builder<K, V> setTopic(String topic);



      abstract Builder<K, V> setWriteRecordsTransform(WriteRecords<K, V> transform);



      abstract Write<K, V> build();



      @Override

      public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(

          External.Configuration configuration) {

        String topic = utf8String(configuration.topic);

        setTopic(topic);



        Map<String, Object> producerConfig = new HashMap<>();

        for (KV<byte[], byte[]> kv : configuration.producerConfig) {

          String key = utf8String(kv.getKey());

          String value = utf8String(kv.getValue());

          producerConfig.put(key, value);

        }

        Class keySerializer = resolveClass(utf8String(configuration.keySerializer));

        Class valSerializer = resolveClass(utf8String(configuration.valueSerializer));



        WriteRecords<K, V> writeRecords =

            KafkaIO.<K, V>writeRecords()

                .updateProducerProperties(producerConfig)

                .withKeySerializer(keySerializer)

                .withValueSerializer(valSerializer)

                .withTopic(topic);

        setWriteRecordsTransform(writeRecords);



        return build();

      }

    }



    /** Exposes {@link KafkaIO.Write} as an external transform for cross-language usage. */

    @AutoService(ExternalTransformRegistrar.class)

    public static class External implements ExternalTransformRegistrar {



      public static final String URN = "unified:external:java:kafka:write:v1";



      @Override

      public Map<String, Class<? extends ExternalTransformBuilder>> knownBuilders() {

        return ImmutableMap.of(URN, AutoValue_KafkaIO_Write.Builder.class);

      }



      /** Parameters class to expose the Write transform to an external SDK. */

      public static class Configuration {



        // All byte arrays are UTF-8 encoded strings

        private Iterable<KV<byte[], byte[]>> producerConfig;

        private byte[] topic;

        private byte[] keySerializer;

        private byte[] valueSerializer;



        public void setProducerConfig(Iterable<KV<byte[], byte[]>> producerConfig) {

          this.producerConfig = producerConfig;

        }



        public void setTopic(byte[] topic) {

          this.topic = topic;

        }



        public void setKeySerializer(byte[] keySerializer) {

          this.keySerializer = keySerializer;

        }



        public void setValueSerializer(byte[] valueSerializer) {

          this.valueSerializer = valueSerializer;

        }

      }

    }



    /** Used mostly to reduce using of boilerplate of wrapping {@link WriteRecords} methods. */

    private Write<K, V> withWriteRecordsTransform(WriteRecords<K, V> transform) {

      return toBuilder().setWriteRecordsTransform(transform).build();

    }



    /**

     * Wrapper method over {@link WriteRecords#withBootstrapServers(String)}, used to keep the

     * compatibility with old API based on KV type of element.

     */

    public Write<K, V> withBootstrapServers(String bootstrapServers) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform().withBootstrapServers(bootstrapServers));

    }



    /**

     * Wrapper method over {@link WriteRecords#withTopic(String)}, used to keep the compatibility

     * with old API based on KV type of element.

     */

    public Write<K, V> withTopic(String topic) {

      return toBuilder()

          .setTopic(topic)

          .setWriteRecordsTransform(getWriteRecordsTransform().withTopic(topic))

          .build();

    }



    /**

     * Wrapper method over {@link WriteRecords#withKeySerializer(Class)}, used to keep the

     * compatibility with old API based on KV type of element.

     */

    public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {

      return withWriteRecordsTransform(getWriteRecordsTransform().withKeySerializer(keySerializer));

    }



    /**

     * Wrapper method over {@link WriteRecords#withValueSerializer(Class)}, used to keep the

     * compatibility with old API based on KV type of element.

     */

    public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform().withValueSerializer(valueSerializer));

    }



    /**

     * Wrapper method over {@link WriteRecords#withProducerFactoryFn(SerializableFunction)}, used to

     * keep the compatibility with old API based on KV type of element.

     */

    public Write<K, V> withProducerFactoryFn(

        SerializableFunction<Map<String, Object>, Producer<K, V>> producerFactoryFn) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform().withProducerFactoryFn(producerFactoryFn));

    }



    /**

     * Wrapper method over {@link WriteRecords#withInputTimestamp()}, used to keep the compatibility

     * with old API based on KV type of element.

     */

    public Write<K, V> withInputTimestamp() {

      return withWriteRecordsTransform(getWriteRecordsTransform().withInputTimestamp());

    }



    /**

     * Wrapper method over {@link

     * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}, used to keep the

     * compatibility with old API based on KV type of element.

     *

     * @deprecated use {@link WriteRecords} and {@code ProducerRecords} to set publish timestamp.

     */

    @Deprecated

    @SuppressWarnings({"unchecked", "rawtypes"})

    public Write<K, V> withPublishTimestampFunction(

        KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform()

              .withPublishTimestampFunction(new PublishTimestampFunctionKV(timestampFunction)));

    }



    /**

     * Wrapper method over {@link WriteRecords#withEOS(int, String)}, used to keep the compatibility

     * with old API based on KV type of element.

     */

    public Write<K, V> withEOS(int numShards, String sinkGroupId) {

      return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, sinkGroupId));

    }



    /**

     * Wrapper method over {@link WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to

     * keep the compatibility with old API based on KV type of element.

     */

    public Write<K, V> withConsumerFactoryFn(

        SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform().withConsumerFactoryFn(consumerFactoryFn));

    }



    /**

     * Adds the given producer properties, overriding old values of properties with the same key.

     */

    public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {

      return withWriteRecordsTransform(

          getWriteRecordsTransform().updateProducerProperties(configUpdates));

    }



    @Override

    public PDone expand(PCollection<KV<K, V>> input) {

      checkArgument(getTopic() != null, "withTopic() is required");



      KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();

      return input

          .apply(

              "Kafka ProducerRecord",

              MapElements.via(

                  new SimpleFunction<KV<K, V>, ProducerRecord<K, V>>() {

                    @Override

                    public ProducerRecord<K, V> apply(KV<K, V> element) {

                      return new ProducerRecord<>(getTopic(), element.getKey(), element.getValue());

                    }

                  }))

          .setCoder(ProducerRecordCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()))

          .apply(getWriteRecordsTransform());

    }



    @Override

    public void validate(PipelineOptions options) {

      getWriteRecordsTransform().validate(options);

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      getWriteRecordsTransform().populateDisplayData(builder);

    }



    /**

     * Writes just the values to Kafka. This is useful for writing collections of values rather

     * thank {@link KV}s.

     */

    @SuppressWarnings({"unchecked", "rawtypes"})

    public PTransform<PCollection<V>, PDone> values() {

      return new KafkaValueWrite<K, V>(this.withKeySerializer((Class) StringSerializer.class));

    }



    /**

     * Wrapper class which allows to use {@code KafkaPublishTimestampFunction<KV<K, V>} with {@link

     * WriteRecords#withPublishTimestampFunction(KafkaPublishTimestampFunction)}.

     */

    private static class PublishTimestampFunctionKV<K, V>

        implements KafkaPublishTimestampFunction<ProducerRecord<K, V>> {



      private KafkaPublishTimestampFunction<KV<K, V>> fn;



      public PublishTimestampFunctionKV(KafkaPublishTimestampFunction<KV<K, V>> fn) {

        this.fn = fn;

      }



      @Override

      public Instant getTimestamp(ProducerRecord<K, V> e, Instant ts) {

        return fn.getTimestamp(KV.of(e.key(), e.value()), ts);

      }

    }

  }



  /**

   * Same as {@code Write<K, V>} without a Key. Null is used for key as it is the convention is

   * Kafka when there is no key specified. Majority of Kafka writers don't specify a key.

   */

  private static class KafkaValueWrite<K, V> extends PTransform<PCollection<V>, PDone> {

    private final Write<K, V> kvWriteTransform;



    private KafkaValueWrite(Write<K, V> kvWriteTransform) {

      this.kvWriteTransform = kvWriteTransform;

    }



    @Override

    public PDone expand(PCollection<V> input) {

      return input

          .apply(

              "Kafka values with default key",

              MapElements.via(

                  new SimpleFunction<V, KV<K, V>>() {

                    @Override

                    public KV<K, V> apply(V element) {

                      return KV.of(null, element);

                    }

                  }))

          .setCoder(KvCoder.of(new NullOnlyCoder<>(), input.getCoder()))

          .apply(kvWriteTransform);

    }



    @Override

    public void populateDisplayData(DisplayData.Builder builder) {

      super.populateDisplayData(builder);

      kvWriteTransform.populateDisplayData(builder);

    }

  }



  private static class NullOnlyCoder<T> extends AtomicCoder<T> {

    @Override

    public void encode(T value, OutputStream outStream) {

      checkArgument(value == null, "Can only encode nulls");

      // Encode as no bytes.

    }



    @Override

    public T decode(InputStream inStream) {

      return null;

    }

  }



  /**

   * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the

   * deserializer argument using the {@link Coder} registry.

   */

  @VisibleForTesting

  static <T> NullableCoder<T> inferCoder(

      CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {

    checkNotNull(deserializer);



    for (Type type : deserializer.getGenericInterfaces()) {

      if (!(type instanceof ParameterizedType)) {

        continue;

      }



      // This does not recurse: we will not infer from a class that extends

      // a class that extends Deserializer<T>.

      ParameterizedType parameterizedType = (ParameterizedType) type;



      if (parameterizedType.getRawType() == Deserializer.class) {

        Type parameter = parameterizedType.getActualTypeArguments()[0];



        @SuppressWarnings("unchecked")

        Class<T> clazz = (Class<T>) parameter;



        try {

          return NullableCoder.of(coderRegistry.getCoder(clazz));

        } catch (CannotProvideCoderException e) {

          throw new RuntimeException(

              String.format(

                  "Unable to automatically infer a Coder for "

                      + "the Kafka Deserializer %s: no coder registered for type %s",

                  deserializer, clazz));

        }

      }

    }



    throw new RuntimeException(

        String.format("Could not extract the Kafka Deserializer type from %s", deserializer));

  }



  private static String utf8String(byte[] bytes) {

    return new String(bytes, Charsets.UTF_8);

  }



  private static Class resolveClass(String className) {

    try {

      return Class.forName(className);

    } catch (ClassNotFoundException e) {

      throw new RuntimeException("Could not find class: " + className);

    }

  }

}